View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: SessionConsumer.java,v 1.4 2007/01/24 12:00:28 tanderson Exp $
44   */
45  package org.exolab.jms.server;
46  
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.exolab.jms.client.JmsMessageListener;
50  import org.exolab.jms.message.MessageImpl;
51  import org.exolab.jms.messagemgr.Condition;
52  import org.exolab.jms.messagemgr.ConsumerEndpoint;
53  import org.exolab.jms.messagemgr.ConsumerEndpointListener;
54  import org.exolab.jms.messagemgr.Flag;
55  import org.exolab.jms.messagemgr.MessageHandle;
56  import org.exolab.jms.messagemgr.QueueBrowserEndpoint;
57  import org.exolab.jms.messagemgr.TimedCondition;
58  import org.exolab.jms.persistence.DatabaseService;
59  import org.exolab.jms.persistence.PersistenceException;
60  import org.exolab.jms.scheduler.Scheduler;
61  import org.exolab.jms.scheduler.SerialTask;
62  
63  import javax.jms.JMSException;
64  import java.rmi.RemoteException;
65  import java.util.ArrayList;
66  import java.util.HashMap;
67  import java.util.Iterator;
68  import java.util.LinkedList;
69  import java.util.List;
70  
71  
72  /***
73   * Manages all consumers for a session.
74   *
75   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
76   * @version $Revision: 1.4 $ $Date: 2007/01/24 12:00:28 $
77   */
78  class SessionConsumer implements ConsumerEndpointListener {
79  
80      /***
81       * The message listener is the reference to a remote client that will
82       * receive the messages.
83       */
84      private JmsMessageListener _listener;
85  
86      /***
87       * Maintain a set of ConsumerEndpoint instances, keyed on id.
88       */
89      private final HashMap _consumers = new HashMap();
90  
91      /***
92       * Caches all sent messages.
93       */
94      private final SentMessageCache _sent;
95  
96      /***
97       * The database service.
98       */
99      private final DatabaseService _database;
100 
101     /***
102      * The set of consumer endpoints with messages pending.
103      */
104     private final LinkedList _pending = new LinkedList();
105 
106     /***
107      * Determines if the sender is stopping/stopped.
108      */
109     private Flag _stop = new Flag(true);
110 
111     /***
112      * Stop/start lock.
113      */
114     private final Object _restartLock = new Object();
115 
116     /***
117      * The active consumer lock.
118      */
119     private final Object _removeLock = new Object();
120 
121     /***
122      * The consumer currently being dispatched to.
123      */
124     private long _consumerId = -1;
125 
126     /***
127      * The maximum number of messages that a dispatch can deliver at any one
128      * time
129      */
130     private final int MAX_MESSAGES = 200;
131 
132     /***
133      * The logger.
134      */
135     private static final Log _log = LogFactory.getLog(SessionConsumer.class);
136 
137 
138     private final SerialTask _runner;
139 
140     /***
141      * Construct a new <code>SessionConsumer</code>.
142      *
143      * @param ackMode   the message acknowledgement mode, or
144      *                  <code>Session.TRANSACTED_SESSION</code>
145      *                  if the session is transactional
146      * @param database  the database service
147      * @param scheduler the scheduler
148      */
149     public SessionConsumer(int ackMode, DatabaseService database,
150                            Scheduler scheduler) {
151         _database = database;
152         _sent = new SentMessageCache(ackMode);
153         Runnable task = new Runnable() {
154             public void run() {
155                 dispatch();
156             }
157         };
158 
159         _runner = new SerialTask(task, scheduler);
160     }
161 
162     /***
163      * Set the listener for this session.
164      * <p/>
165      * The listener is notified whenever a message for the session is present.
166      *
167      * @param listener the message listener
168      */
169     public synchronized void setMessageListener(JmsMessageListener listener) {
170         _listener = listener;
171     }
172 
173     /***
174      * Register a consumer.
175      *
176      * @param consumer the consumer to add
177      */
178     public synchronized void addConsumer(ConsumerEndpoint consumer) {
179         final long id = consumer.getId();
180         _consumers.put(new Long(id), consumer);
181         consumer.setListener(this);
182     }
183 
184     /***
185      * Deregister a consumer.
186      *
187      * @param consumerId the consumer identifier
188      * @return the consumer
189      * @throws JMSException if the consumer can't be removed
190      */
191     public ConsumerEndpoint removeConsumer(long consumerId)
192             throws JMSException {
193         ConsumerEndpoint consumer;
194         synchronized (_removeLock) {
195             while (consumerId == _consumerId) {
196                 try {
197                     _removeLock.wait();
198                 } catch (InterruptedException ignore) {
199                     // do nothing
200                 }
201             }
202             synchronized (this) {
203                 consumer = (ConsumerEndpoint) _consumers.remove(
204                         new Long(consumerId));
205                 if (consumer == null) {
206                     throw new JMSException("No consumer with id=" + consumerId);
207                 }
208                 consumer.setListener(null);
209             }
210             synchronized (_pending) {
211                 _pending.remove(consumer);
212             }
213         }
214 
215         return consumer;
216     }
217 
218     /***
219      * Returns the consumers.
220      *
221      * @return the consumers
222      */
223     public synchronized ConsumerEndpoint[] getConsumers() {
224         return (ConsumerEndpoint[]) _consumers.values()
225                 .toArray(new ConsumerEndpoint[0]);
226     }
227 
228     /***
229      * Enable or disable asynchronous message delivery for a consumer.
230      *
231      * @param consumerId the consumer identifier
232      * @param enable     <code>true</code> to enable; <code>false</code> to
233      *                   disable
234      * @throws JMSException for any JMS error
235      */
236     public void setAsynchronous(long consumerId, boolean enable)
237             throws JMSException {
238         ConsumerEndpoint consumer = getConsumer(consumerId);
239         consumer.setAsynchronous(enable);
240         if (enable && consumer.getMessageCount() != 0) {
241             messageAvailable(consumer);
242         }
243 
244     }
245 
246     /***
247      * Stop message delivery.
248      */
249     public void stop() {
250         synchronized (_restartLock) {
251             _stop.set(true);
252             _runner.stop();
253             _log.debug("stopped delivery");
254         }
255     }
256 
257     /***
258      * Start message delivery.
259      */
260     public void start() throws JMSException {
261         synchronized (_restartLock) {
262             _log.debug("start");
263             _stop.set(false);
264             for (Iterator i = _consumers.values().iterator(); i.hasNext();) {
265                 ConsumerEndpoint consumer = (ConsumerEndpoint) i.next();
266                 if (needsScheduling(consumer)) {
267                     queue(consumer);
268                 }
269             }
270             try {
271                 _runner.schedule();
272             } catch (InterruptedException exception) {
273                 _log.error("Failed to start worker", exception);
274                 throw new JMSException("Failed to start worker: " + exception);
275             }
276         }
277     }
278 
279     /***
280      * Recover the session.
281      * <p/>
282      * This will cause all unacknowledged messages to be redelivered.
283      *
284      * @throws JMSException if the session can't be recovered
285      */
286     public synchronized void recover() throws JMSException {
287         stop();             // stop message delivery
288         try {
289             _database.begin();
290             _sent.clear();  // clear the messages in the sent message cache
291             _database.commit();
292         } catch (Exception exception) {
293             rethrow(exception.getMessage(), exception);
294         }
295         start();           // restart message delivery
296     }
297 
298     /***
299      * Commit the sesion.
300      * <p/>
301      * This will acknowledge all sent messages for all consumers.
302      *
303      * @throws JMSException if the session fails to commit
304      */
305     public synchronized void commit() throws JMSException {
306         try {
307             _database.begin();
308             _sent.acknowledgeAll();
309             _database.commit();
310         } catch (OutOfMemoryError exception) {
311             rethrow("Failed to commit session due to out-of-memory error",
312                     exception);
313         } catch (Exception exception) {
314             rethrow(exception.getMessage(), exception);
315         }
316     }
317 
318     /***
319      * Rollback the session.
320      * <p/>
321      * This will cause all unacknowledged messages to be redelivered.
322      *
323      * @throws JMSException for any error
324      */
325     public synchronized void rollback() throws JMSException {
326         stop();             // stop message delivery
327         try {
328             _database.begin();
329             _sent.clear();  // clear the messages in the sent message cache
330             _database.commit();
331         } catch (Exception exception) {
332             rethrow(exception.getMessage(), exception);
333         }
334         start();           // restart message delivery
335     }
336 
337     /***
338      * Return the next available mesage to the specified consumer.
339      * <p/>
340      * This method is non-blocking. If no messages are available, it will return
341      * immediately.
342      *
343      * @param consumerId the consumer identifier
344      * @return the next message or <code>null</code> if none is available
345      * @throws JMSException for any JMS error
346      */
347     public MessageImpl receiveNoWait(long consumerId) throws JMSException {
348         MessageImpl result = null;
349         if (!_stop.get()) {
350             result = doReceive(consumerId, null);
351         }
352         return result;
353     }
354 
355     /***
356      * Return the next available message to the specified consumer.
357      * <p/>
358      * This method is non-blocking. However, clients can specify a
359      * <code>wait</code> interval to indicate how long they are prepared to wait
360      * for a message. If no message is available, and the client indicates that
361      * it will wait, it will be notified via the registered {@link
362      * JmsMessageListener} if one subsequently becomes available.
363      *
364      * @param consumerId the consumer identifier
365      * @param wait       number of milliseconds to wait. A value of <code>0
366      *                   </code> indicates to wait indefinitely
367      * @return the next message or <code>null</code> if none is available
368      * @throws JMSException for any JMS error
369      */
370     public MessageImpl receive(long consumerId, long wait) throws JMSException {
371         MessageImpl result = null;
372         Condition condition;
373         if (wait > 0) {
374             condition = TimedCondition.before(wait);
375         } else {
376             condition = new Flag(true);
377         }
378         if (!_stop.get()) {
379             result = doReceive(consumerId, condition);
380         } else {
381             ConsumerEndpoint consumer = getConsumer(consumerId);
382             consumer.setWaitingForMessage(condition);
383         }
384         return result;
385     }
386 
387     /***
388      * Browse up to count messages.
389      *
390      * @param consumerId the consumer identifier
391      * @param count      the maximum number of messages to receive
392      * @return a list of {@link MessageImpl} instances
393      * @throws JMSException for any JMS error
394      */
395     public List browse(long consumerId, int count) throws JMSException {
396         ConsumerEndpoint consumer = getConsumer(consumerId);
397         if (!(consumer instanceof QueueBrowserEndpoint)) {
398             throw new JMSException("Can't browse messages: invalid consumer");
399         }
400 
401         List messages = new ArrayList(count);
402 
403         try {
404             _database.begin();
405             for (int i = 0; i < count && !_stop.get();) {
406                 MessageHandle handle = consumer.receive(_stop);
407                 if (handle == null) {
408                     break;
409                 }
410                 MessageImpl orig = handle.getMessage();
411                 if (orig != null) {
412                     messages.add(copy(orig, handle));
413                     ++i;
414                 }
415             }
416             _database.commit();
417         } catch (Exception exception) {
418             rethrow("Failed to browse messages", exception);
419         }
420         return messages;
421     }
422 
423     /***
424      * Acknowledge that a message has been processed.
425      *
426      * @param consumerId the identity of the consumer performing the ack
427      * @param messageId  the message identifier
428      * @throws JMSException for any error
429      */
430     public synchronized void acknowledge(long consumerId, String messageId)
431             throws JMSException {
432         try {
433             _database.begin();
434             _sent.acknowledge(messageId, consumerId);
435             _database.commit();
436         } catch (Exception exception) {
437             rethrow("Failed to acknowledge message", exception);
438         }
439     }
440 
441     /***
442      * Close the consumer.
443      *
444      * @throws JMSException for any eror
445      */
446     public synchronized void close() throws JMSException {
447         _log.debug("close");
448         stop();
449         _listener = null;
450         try {
451             _database.begin();
452             _sent.clear();
453             _database.commit();
454         } catch (Exception exception) {
455             rethrow(exception.getMessage(), exception);
456         }
457     }
458 
459     /***
460      * Notifies that a message is available for a particular consumer.
461      *
462      * @param consumer the consumer
463      */
464     public void messageAvailable(ConsumerEndpoint consumer) {
465         if (queue(consumer)) {
466             try {
467                 _runner.schedule();
468             } catch (InterruptedException exception) {
469                 _log.error("Failed to schedule worker", exception);
470             }
471         }
472     }
473 
474     /***
475      * Send messages to the client.
476      */
477     private void dispatch() {
478         final Condition timeout = TimedCondition.after(30 * 1000);
479         Condition done = new Condition() {
480             public boolean get() {
481                 return _stop.get() || timeout.get();
482             }
483         };
484 
485         _log.debug("dispatch");
486         int sent = 0;
487         while (sent < MAX_MESSAGES && !done.get()) {
488             ConsumerEndpoint consumer;
489             synchronized (_pending) {
490                 if (!_pending.isEmpty()) {
491                     consumer = (ConsumerEndpoint) _pending.removeFirst();
492                 } else {
493                     break;
494                 }
495             }
496             if (wantsMessages(consumer)) {
497                 if (consumer.isAsynchronous()) {
498                     if (send(consumer, done)) {
499                         ++sent;
500                     }
501                     if (needsScheduling(consumer)) {
502                         queue(consumer);
503                     }
504                 } else {
505                     notifyMessageAvailable();
506                 }
507             }
508         }
509         boolean empty;
510         synchronized (_pending) {
511             empty = _pending.isEmpty();
512         }
513         if (!empty && !_stop.get()) {
514             // reschedule this if needed
515             try {
516                 _runner.schedule();
517             } catch (InterruptedException exception) {
518                 _log.error("Failed to reschedule worker", exception);
519             }
520         }
521         _log.debug("dispatch[sent=" + sent + "]");
522     }
523 
524     private void notifyMessageAvailable() {
525         try {
526             // notify the client sesssion.
527             _listener.onMessageAvailable();
528         } catch (RemoteException exception) {
529             _log.debug("Failed to notify client", exception);
530         }
531     }
532 
533     private boolean queue(ConsumerEndpoint consumer) {
534         boolean queued = false;
535         if (!_stop.get()) {
536             synchronized (_pending) {
537                 if (!_pending.contains(consumer)) {
538                     _pending.add(consumer);
539                     queued = true;
540                 }
541             }
542         }
543         return queued;
544     }
545 
546     private boolean send(ConsumerEndpoint consumer, Condition cancel) {
547         boolean sent = false;
548         MessageHandle handle = null;
549         try {
550             _database.begin();
551             try {
552                 synchronized (_removeLock) {
553                     _consumerId = consumer.getId();
554                 }
555                 handle = consumer.receive(cancel);
556                 if (handle != null) {
557                     MessageImpl message = handle.getMessage();
558                     if (message != null) {
559                         // send the client a copy.
560                         message = copy(message, handle);
561 
562                         // clear any wait condition
563                         // @todo - possible race condition? Could
564                         // syncbronous client timeout and request again,
565                         // and this trash subsequent wait?
566                         consumer.setWaitingForMessage(null);
567 
568                         _sent.preSend(handle);
569                         _database.commit();
570 
571                         // send the message
572                         sent = send(message);
573 
574                         if (sent) {
575                             _database.begin();
576                             _sent.postSend(handle);
577                             _database.commit();
578                         }
579                     }
580                 } else {
581                     _database.commit();
582                 }
583             } finally {
584                 synchronized (_removeLock) {
585                     _consumerId = -1;
586                     _removeLock.notify();
587                 }
588             }
589         } catch (Exception exception) {
590             cleanup(exception.getMessage(), exception);
591         }
592         if (!sent && handle != null) {
593             try {
594                 _database.begin();
595                 handle.release();
596                 _database.commit();
597             } catch (Exception exception) {
598                 cleanup("Failed to release unsent message", exception);
599             }
600         }
601         return sent;
602     }
603 
604     /***
605      * Send the specified message to the client.
606      *
607      * @param message the message
608      * @return <code>true</code> if the message was successfully sent
609      */
610     protected boolean send(MessageImpl message) {
611         boolean delivered = false;
612         try {
613             // send the message to the listener.
614             delivered = _listener.onMessage(message);
615             if (_log.isDebugEnabled()) {
616                 _log.debug("send[JMSMessageID=" + message.getMessageId()
617                         + ", delivered=" + delivered + "]");
618             }
619         } catch (RemoteException exception) {
620             _log.info("Failed to notify client", exception);
621         }
622         return delivered;
623     }
624 
625     private boolean wantsMessages(ConsumerEndpoint consumer) {
626         boolean result = false;
627         if (consumer.isAsynchronous() || consumer.isWaitingForMessage()) {
628             result = true;
629         }
630         return result;
631     }
632 
633     private boolean needsScheduling(ConsumerEndpoint consumer) {
634         boolean result = false;
635         if (wantsMessages(consumer) && consumer.getMessageCount() != 0) {
636             result = true;
637         }
638         return result;
639     }
640 
641     private MessageImpl doReceive(long consumerId, final Condition wait)
642             throws JMSException {
643         ConsumerEndpoint consumer = getConsumer(consumerId);
644 
645         Condition cancel;
646         if (wait != null) {
647             cancel = new Condition() {
648                 public boolean get() {
649                     return _stop.get() || !wait.get();
650                 }
651             };
652         } else {
653             cancel = _stop;
654         }
655 
656         MessageImpl message = null;
657         try {
658             _database.begin();
659             MessageHandle handle = consumer.receive(cancel);
660 
661             if (handle != null) {
662                 // retrieve the message and copy it
663                 message = handle.getMessage();
664                 if (message != null) {
665                     message = copy(message, handle);
666                 }
667             }
668             if (message == null) {
669                 // no message available. Mark the consumer as (possibly) waiting
670                 // for a message.
671                 consumer.setWaitingForMessage(wait);
672             } else {
673                 // clear any wait condition
674                 consumer.setWaitingForMessage(null);
675 
676                 // if we have a non-null message then add it to the sent message
677                 // cache. Additionally, if we are part of a global transaction
678                 // then we must also send it to the ResourceManager for recovery.
679                 _sent.preSend(handle);
680             }
681             _database.commit();
682         } catch (Exception exception) {
683             rethrow(exception.getMessage(), exception);
684         }
685         if (_log.isDebugEnabled()) {
686             if (message != null) {
687                 _log.debug("doReceive(consumerId=" + consumerId +
688                         ") -> JMSMesssageID=" + message.getMessageId());
689             }
690         }
691 
692         return message;
693     }
694 
695     /***
696      * Helper to copy a message.
697      *
698      * @param message the message to copy
699      * @param handle  the handle the message came from
700      * @return a copy of the message
701      * @throws JMSException if the copy fails
702      */
703     private MessageImpl copy(MessageImpl message, MessageHandle handle)
704             throws JMSException {
705         MessageImpl result;
706         try {
707             result = (MessageImpl) message.clone();
708             result.setJMSRedelivered(handle.getDelivered());
709             result.setConsumerId(handle.getConsumerId());
710         } catch (JMSException exception) {
711             throw exception;
712         } catch (CloneNotSupportedException exception) {
713             _log.error(exception, exception);
714             throw new JMSException(exception.getMessage());
715         }
716         return result;
717     }
718 
719     /***
720      * Returns the consumer endpoint given its identifier.
721      *
722      * @param consumerId the consumer identifier
723      * @return the consumer endpoint corresponding to <code>consumerId</code>
724      * @throws JMSException if the consumer doesn't exist
725      */
726     private ConsumerEndpoint getConsumer(long consumerId)
727             throws JMSException {
728         ConsumerEndpoint consumer
729                 = (ConsumerEndpoint) _consumers.get(new Long(consumerId));
730         if (consumer == null) {
731             throw new JMSException("Consumer not registered: " + consumerId);
732         }
733         return consumer;
734     }
735 
736     /***
737      * Helper to clean up after a failed call.
738      *
739      * @param message   the message to log
740      * @param exception the exception to log
741      */
742     private void cleanup(String message, Throwable exception) {
743         _log.error(message, exception);
744         try {
745             if (_database.isTransacted()) {
746                 _database.rollback();
747             }
748         } catch (PersistenceException error) {
749             _log.warn("Failed to rollback after error", error);
750         }
751     }
752 
753     /***
754      * Helper to clean up after a failed call, and rethrow.
755      *
756      * @param message   the message to log
757      * @param exception the exception
758      * @throws JMSException the original exception adapted to a
759      *                      <code>JMSException</code> if necessary
760      */
761     private void rethrow(String message, Throwable exception)
762             throws JMSException {
763         cleanup(message, exception);
764         if (exception instanceof JMSException) {
765             throw (JMSException) exception;
766         }
767         throw new JMSException(exception.getMessage());
768     }
769 
770 }